#%matplotlib notebook
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkContext
import matplotlib.pyplot as plt
import numpy as np
import fbprophet
sqlContext = SQLContext(sc)
Loading the dataset
dataPath ='../ts-test/Police_Department_Incident_Reports__Historical_2003_to_May_2018.csv'
crimeDataSchema = StructType([StructField("IncidntNum", LongType(), True),
StructField("Category", StringType(), True),
StructField("Descript", StringType(), True),
StructField("DayOfWeek", StringType(), True),
StructField("Date", StringType(), True),
StructField("Time", StringType(), True),
StructField("PdDistrict", StringType(), True),
StructField("Resolution", StringType(), True),
StructField("Address", StringType(), True),
StructField("X", DoubleType(), True),
StructField("Y", DoubleType(), True),
StructField("Location", StringType(), True),
StructField("PdId", LongType(), True)])
crimeDF = (sqlContext.read
.format('csv')
.option('delimiter', ',')
.option('header', 'true')
.load(dataPath, schema=crimeDataSchema))
from pyspark.sql.functions import udf, unix_timestamp, to_timestamp
import datetime
def parseDate(dateStr):
return unix_timestamp(dateStr, 'MM/dd/yyyy').cast('timestamp')
crimeDF_date = (crimeDF.withColumn("Date_tmp", unix_timestamp(crimeDF.Date, 'MM/dd/yyyy').cast('timestamp')) #.withColumn("Date_tmp", udf(parseDate, TimestampType())(crimeDF.Date))
.drop("Date")
.withColumnRenamed("Date_tmp", "Date"))
crime_date_count = crimeDF_date.groupBy("Date", "Category").count().select("Date","Category", "Count").orderBy("Date")
def intCount(c):
return float(c)
crime_date_count = (crime_date_count.withColumn("Count_t", udf(intCount, DoubleType())(crime_date_count.Count))
.drop("Count")
.withColumnRenamed("Count_t", "Count")
.cache())
The dataframe was reconstructed to have "Date" in timestamp format and "Count" in Double type
crime_date_count.show()
list_of_cats = [r.Category for r in crime_date_count.select("Category").distinct().collect()]
plt.figure(figsize=[15,10])
plt.title('Occurence of Crimes by Category')
for cat in list_of_cats:
data = crime_date_count.where(crime_date_count["Category"] == cat).select("Date", "Count")
dates_c = [d.Date for d in data.select("Date").collect()]
counts_c = [r.Count for r in data.select("Count").collect()]
plt.plot(dates_c, counts_c, label=cat)
plt.legend(loc='upper left',bbox_to_anchor=(1.04,1))
plt.show()
dates = crime_date_count.select("Date").collect()
dates = [d.Date for d in dates]
minDate = dates[0]
maxDate = dates[len(dates) - 1]
Divide the data to one from the earliest date to 2018/3/31 and one from 2018/4/1 to 2018/4/30
crime_date_count_sample = crime_date_count.where(crime_date_count.Date.between(minDate, datetime.datetime(2017, 5, 15, 0, 0))).cache()
crime_date_count_sample_future = crime_date_count.where(crime_date_count.Date.between(datetime.datetime(2017, 5, 16, 0, 0), maxDate)).cache()
crime_date_count_sample.show()
import logging
logger = logging.getLogger()
logger.setLevel(logging.CRITICAL) #Setting the logging level to remove unwanted warning in plots
for cat in list_of_cats:
data = (crime_date_count_sample
.where(crime_date_count_sample.Category == cat)
.withColumnRenamed('Date', 'ds')
.withColumnRenamed('Count', 'y')
.select('ds', 'y')
.orderBy('ds').toPandas())
model = fbprophet.Prophet();
model.fit(data)
future_df = model.make_future_dataframe(periods=365)
forecast_df = model.predict(future_df)
future_section = forecast_df.loc[(forecast_df['ds'] > datetime.datetime(2017, 5, 15, 0, 0))]
plt.figure(figsize=[15,10])
plt.title(cat)
plt.plot([d.Date for d in crime_date_count_sample_future.where(crime_date_count_sample_future.Category == cat).select("Date").collect()], [r.Count for r in crime_date_count_sample_future.where(crime_date_count_sample_future.Category == cat).select("Count").collect()],label="Real Data")
plt.plot(future_section['ds'], future_section['yhat_upper'], label="Predicted Upper")
plt.plot(future_section['ds'], future_section['yhat'], label="Predicted")
plt.plot(future_section['ds'], future_section['yhat_lower'], label="Predicted Lower")
plt.legend()
plt.show()
allCat_count = crimeDF_date.groupBy("Date").count().select("Date", "Count").orderBy("Date")
allCat_count = (allCat_count.withColumn("Count_t", udf(intCount, DoubleType())(allCat_count.Count))
.drop("Count")
.withColumnRenamed("Date", "ds")
.withColumnRenamed("Count_t", "y")
.cache())
allCat_count.show()
Split the data to one to be fitted to the model and one for testing
allCat_count_sample = allCat_count.where(allCat_count.ds.between(minDate, datetime.datetime(2017, 5, 15, 0, 0))).cache()
allCat_count_future = allCat_count.where(allCat_count.ds.between(datetime.datetime(2017, 5, 16, 0, 0), maxDate)).cache()
allCatCountPandas = allCat_count_sample.toPandas()
m = fbprophet.Prophet()
m.fit(allCatCountPandas)
future_df = m.make_future_dataframe(periods=365)
print future_df.tail()
forecast_df = m.predict(future_df)
forecast_df
Extracting out the 1-year period prediction
future_section = forecast_df.loc[(forecast_df['ds'] > datetime.datetime(2017, 5, 15, 0, 0))]
future_section
m.plot(forecast_df)
plt.show()
Extracting the prediction for the 1-year period between 2017/5/16 ~ 2018/5/15
plt.figure(figsize=[15,10])
plt.title("Prediction on Total Crime Count for 2017/5/16 ~ 2018/5/15")
plt.plot([d.ds for d in allCat_count_future.select("ds").collect()], [r.y for r in allCat_count_future.select("y").collect()], label="Real Data")
plt.plot(future_section['ds'], future_section['yhat_upper'], label="Predicted Upper")
plt.plot(future_section['ds'], future_section['yhat'], label="Prediction")
plt.plot(future_section['ds'], future_section['yhat_lower'], label="Predicted Lower")
plt.legend()
plt.show()